package com.cmcc.hackson3.main;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cmcc.hackson3.user.QueueManager;

public class Consumer {
	private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
	private static final int MAXCNT = 10;
	private static final AtomicInteger  prefix = new AtomicInteger();
	private volatile boolean  started = false;
	private volatile boolean  terminal = false;
	
	//每个topic的消费者不超过10个
	private static final ConcurrentHashMap<String, AtomicInteger> consumerLock =
			 new ConcurrentHashMap<String, AtomicInteger>();
	private String topic;
	
	public Consumer(String topic) {
		this.topic = topic;
	}
	
	public void stop() {
		terminal = true;
	}
	
	private AtomicInteger getLock(String topic){
		AtomicInteger lock = consumerLock.get(topic);
		if(lock==null) {
			synchronized (consumerLock) {
				lock = consumerLock.get(topic);
				if(lock==null) {
					AtomicInteger item = new AtomicInteger();
					consumerLock.put(topic, item);
					return item;
				}else {
					return lock;
				}
			}
		}else {
			return lock;
		}
	}
	
	public void start() {
		if(started) return;
		AtomicInteger lock = getLock(topic);
		if(lock.get()>=MAXCNT) {
			logger.warn("启动失败。每个topic的消费者不超过10个");
			return;
		}
		
		Thread r = new Thread(null ,new Runnable() {

			@Override
			public void run() {
				while(!terminal) {
					try {
						LinkedBlockingQueue<Message> queue = RocketMQ.INS.getQueue(topic);
						Message msg = queue.take();
						msg.setConsumeTime(System.currentTimeMillis());
						if(msg.getTerminalTime() < System.currentTimeMillis()) {
							msg.setDiscardmsg("timeout.");
							QueueManager.INS.discard(topic, msg);
						}else {
							QueueManager.INS.outqueue(topic, msg);
						}
						//模拟业务线程消费消息，调用各种接口很耗时。
						Thread.sleep(RandomUtils.nextInt(100,350));
					} catch (InterruptedException e) {
						
					}
				}
			}
		},topic +"-"+ prefix.getAndIncrement());
		r.setDaemon(true);
		r.start();
		QueueManager.INS.consumerStarted(topic);
		lock.incrementAndGet();
	}

}
